package com.newegg.hardware.benchmark.service;

import java.util.HashMap;
import java.util.Map;
import java.util.UUID;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.scheduling.annotation.Async;
import org.springframework.scheduling.concurrent.ThreadPoolTaskExecutor;
import org.springframework.stereotype.Service;
import org.springframework.transaction.annotation.Transactional;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.google.common.cache.Cache;
import com.google.common.cache.CacheBuilder;
import com.newegg.hardware.benchmark.dao.ItemmarkMapper;
import com.newegg.hardware.benchmark.model.Hardmark;
import com.newegg.hardware.benchmark.model.HardwareScore;
import com.newegg.hardware.benchmark.model.ItemSpec;
import com.newegg.hardware.benchmark.model.Itemmark;
import com.newegg.hardware.benchmark.model.Score;
import com.newegg.hardware.benchmark.model.Type;
import com.newegg.hardware.benchmark.util.JSONUtil;

@Service
public class ScoreService {
	static Cache<String, Score> CACHE = CacheBuilder.newBuilder().maximumSize(100000).build();
	@Autowired
    ThreadPoolTaskExecutor taskpool;
	@Autowired
	CpuScoreService cpuScoreService;
	@Autowired
	GpuScoreService gpuScoreService;
	@Autowired
	MemoryScoreService memoyScoreService;
	@Autowired
	DiskScoreService diskScoreService;
	@Autowired
    KafkaTemplate<String, String> kafkaTemplate;
	@Autowired
	ItemmarkMapper itemmarkService;
	@Autowired
	ItemService itemService;
	@Value("${spring.kafka.consumer.topic}")
	String topic;
	@Value("${spring.cache.enable:false}")
	boolean useCache;
	
	private Hardmark findHardmark(Type type, String name) {
		switch (type) {
			case CPU: return cpuScoreService.getScoreByName(name);
			case GPU: return gpuScoreService.getScoreByName(name);
			case MEMORY: return memoyScoreService.getScoreByName(name);
			case DISK: return diskScoreService.getScoreByName(name);
			default: throw new IllegalArgumentException("type error");
		}
	}
	
	private int findMeanByName(Type type) {
		switch (type) {
			case CPU: return cpuScoreService.getMeanScore();
			case GPU: return gpuScoreService.getMeanScore();
			case MEMORY: return memoyScoreService.getMeanScore();
			case DISK: return diskScoreService.getMeanScore();
			default: throw new IllegalArgumentException("type error");
		}
	}
	
	private int findStarByName(Type type, int score) {
		switch (type) {
			case CPU: return cpuScoreService.getStar(score);
			case GPU: return gpuScoreService.getStar(score);
			case MEMORY: return memoyScoreService.getStar(score);
			case DISK: return diskScoreService.getStar(score);
			default: throw new IllegalArgumentException("type error");
		}
	}
	
	public Score findByName(Type type, String name) {
		String CACHE_NAME = "_FINDBYNAME" + type.name() + ":" + name;
		Score score = CACHE.getIfPresent(CACHE_NAME);
		if(useCache && score != null) { return score; }
		Hardmark hardmark = findHardmark(type, name);
		if(hardmark == null) { return null; }
		int mean = findMeanByName(type);
		score = new Score();
		score.setMean(mean);
		score.setScore(hardmark.getMark());
		score.setStar(findStarByName(type, score.getScore()));
		score.setSimulate(hardmark.getSimulate());
		CACHE.put(CACHE_NAME, score);
		return score;
	}
	
	@Transactional(rollbackFor=Exception.class)
	public HardwareScore getScore(JSONObject json, Map<String, String> headers) {
    	JSONArray cpus = json.getJSONArray("cpu");
    	JSONArray gpus = json.getJSONArray("gpu");
    	JSONArray memorys = json.getJSONArray("memory");
    	JSONArray disks = json.getJSONArray("disk");
    	Integer cpu_score = json.getInteger("cpu_score");
    	Integer disk_score = json.getInteger("disk_score");
    	Integer memory_score = json.getInteger("memory_score");
    	Integer gpu_score = json.getInteger("gpu_score");
    	HardwareScore score = new HardwareScore();
    	score.setCpu(cpuScoreService.getScore(cpus, cpu_score));
    	score.setGpu(gpuScoreService.getScore(gpus, gpu_score));
    	score.setDisk(diskScoreService.getScore(disks, disk_score));
    	score.setMemory(memoyScoreService.getScore(memorys, memory_score));
    	putKafka(json, score, headers);
    	return score;
	}
	
	@Async
	private void putKafka(JSONObject json, HardwareScore score, Map<String, String> headers) {
		String id = UUID.randomUUID().toString();
		JSONArray cpus = json.getJSONArray("cpu");
    	JSONArray gpus = json.getJSONArray("gpu");
    	JSONArray memorys = json.getJSONArray("memory");
    	JSONArray disks = json.getJSONArray("disk");
    	JSONObject baseboard = json.getJSONObject("baseboard");
    	String productModel = baseboard.getString("ProductModel");
    	if(productModel == null) {
    		productModel = baseboard.getString("SerialNumber");
    	}
    	final String DEVICE = productModel;
    	taskpool.execute(new Runnable() {
			@Override
			public void run() {
		    	JSONObject computer = json.getJSONObject("system");
		    	computer.put("DEVICE", DEVICE);
		    	computer.put("UUID", id);
		    	computer.putAll(headers);
				kafkaTemplate.send(topic, id + "-SYSTEM", JSON.toJSONString(computer));
				JSONObject board = json.getJSONObject("baseboard");
				board.put("DEVICE", DEVICE);
		    	board.put("UUID", id);
		    	board.putAll(headers);
				kafkaTemplate.send(topic, id + "-BASEBOARD", JSON.toJSONString(board));
		    	JSONObject bios = json.getJSONObject("bios");
		    	bios.put("DEVICE", DEVICE);
		    	bios.put("UUID", id);
		    	bios.putAll(headers);
				kafkaTemplate.send(topic, id + "-BIOS", JSON.toJSONString(bios));
		    	JSONObject keyboard = json.getJSONObject("keyboard");
		    	keyboard.put("DEVICE", DEVICE);
		    	keyboard.put("UUID", id);
		    	keyboard.putAll(headers);
				kafkaTemplate.send(topic, id + "-KEYBOARD", JSON.toJSONString(keyboard));
				JSONObject mouse = json.getJSONObject("mouse");
				mouse.put("DEVICE", DEVICE);
				mouse.put("UUID", id);
				mouse.putAll(headers);
				kafkaTemplate.send(topic, id + "-MOUSE", JSON.toJSONString(mouse));
				JSONUtil.foreach(cpus, (data, index)->{
					data.put("DEVICE", DEVICE);
					data.put("UUID", id);
					data.put("star", score.getCpu().getStar());
					data.put("score", score.getCpu().getScore());
					data.put("mean", score.getCpu().getMean());
					data.putAll(headers);
					kafkaTemplate.send(topic, id + "-CPU-" + index, JSON.toJSONString(data));
				});
				JSONUtil.foreach(gpus, (data, index)->{
					data.put("DEVICE", DEVICE);
					data.put("UUID", id);
					data.put("star", score.getGpu().getStar());
					data.put("score", score.getGpu().getScore());
					data.put("mean", score.getGpu().getMean());
					data.putAll(headers);
					kafkaTemplate.send(topic, id + "-GPU-" + index, JSON.toJSONString(data));
				});
				JSONUtil.foreach(memorys, (data, index)->{
					data.put("DEVICE", DEVICE);
					data.put("UUID", id);
					data.put("star", score.getMemory().getStar());
					data.put("score", score.getMemory().getScore());
					data.put("mean", score.getMemory().getMean());
					data.putAll(headers);
					kafkaTemplate.send(topic, id + "-MEMORY-" + index, JSON.toJSONString(data));
				});
		    	JSONArray displays = json.getJSONArray("display");
				JSONUtil.foreach(displays, (data, index)->{
					data.put("DEVICE", DEVICE);
					data.put("UUID", id);
					data.putAll(headers);
					kafkaTemplate.send(topic, id + "-DISPLAY-" + index, JSON.toJSONString(data));
				});
				JSONUtil.foreach(disks, (data, index)->{
					data.put("DEVICE", DEVICE);
					data.put("UUID", id);
					data.put("star", score.getDisk().getStar());
					data.put("score", score.getDisk().getScore());
					data.put("mean", score.getDisk().getMean());
					data.putAll(headers);
					kafkaTemplate.send(topic, id + "-DISK-" + index, JSON.toJSONString(data));
				});
				JSONArray sounds = json.getJSONArray("sound");
				JSONUtil.foreach(sounds, (data, index)->{
					data.put("DEVICE", DEVICE);
					data.put("UUID", id);
					data.putAll(headers);
					kafkaTemplate.send(topic, id + "-SOUND-" + index, JSON.toJSONString(data));
				});
				JSONArray medias = json.getJSONArray("media");
				JSONUtil.foreach(medias, (data, index)->{
					data.put("DEVICE", DEVICE);
					data.put("UUID", id);
					data.putAll(headers);
					kafkaTemplate.send(topic, id + "-MEDIA-" + index, JSON.toJSONString(data));
				});
				JSONArray networks = json.getJSONArray("network");
				JSONUtil.foreach(networks, (data, index)->{
					data.put("DEVICE", DEVICE);
					data.put("UUID", id);
					data.putAll(headers);
					kafkaTemplate.send(topic, id + "-NETWORK-" + index, JSON.toJSONString(data));
				});
			}
		});
	}

	@Transactional(rollbackFor=Exception.class)
	public Score findByItem(String itemnumber) {
		String CACHE_NAME = "_FINDBYITEM" + itemnumber;
		Score score = CACHE.getIfPresent(CACHE_NAME);
		if(useCache && score != null) { return score; }
		
		Itemmark itemmark = itemmarkService.get(itemnumber);
		if(itemmark != null) {
			score = findByName(Type.valueOf(itemmark.getType()), itemmark.getName());
			if(score == null) {
				score = new Score();
				score.setScore(itemmark.getMark());
				score.setStar(6);
				score.setSimulate(itemmark.getSimulate());
				switch (itemmark.getType()) {
					case "CPU": score.setMean(cpuScoreService.getMeanScore()); break;
					case "GPU": score.setMean(gpuScoreService.getMeanScore()); break;
					case "MEMORY": score.setMean(memoyScoreService.getMeanScore()); break;
					case "DISK": score.setMean(diskScoreService.getMeanScore()); break;
					default: throw new IllegalArgumentException("type error");
				}
			}
		}else {
			ItemSpec spec = itemService.getItem(itemnumber);
			if(spec == null) { return null; }
			JSONArray array = new JSONArray();
			array.add(spec.getSpec());
			switch (spec.getType()) {
				case CPU: score = cpuScoreService.getScore(array, null); break;
				case GPU: score = gpuScoreService.getScore(array, null); break;
				case MEMORY: score = memoyScoreService.getScore(array, null); break;
				case DISK: score = diskScoreService.getScore(array, null); break;
				default: throw new IllegalArgumentException("type error");
			}
			itemmark = new Itemmark();
			itemmark.setId(itemnumber);
			itemmark.setName(spec.getName().toLowerCase());
			itemmark.setSimulate(score.isSimulate());
			itemmark.setType(spec.getType().name());
			itemmark.setMark(score.getScore());
			itemmarkService.insert(itemmark);
		}
		CACHE.put(CACHE_NAME, score);
		return score;
	}

	public void report(Object obj, Map<String, String> headers) {
		Map<String, Object> data = new HashMap<String, Object>();
		data.putAll(headers);
		data.put("data", obj);
		kafkaTemplate.send(topic + "_report", UUID.randomUUID().toString(), JSON.toJSONString(data));
	}
}
